package org.iot.spark.tools

import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.regex.Pattern

import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, ConsumerStrategy, KafkaUtils, LocationStrategies}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.classification.InterfaceAudience.Public
import org.apache.hadoop.hbase.classification.InterfaceStability.Stable
import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory, Get, Put, Result, Table}
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil
import org.apache.hadoop.hbase.util.{Bytes, VersionInfo}
import org.apache.hadoop.hbase.zookeeper.ZKConfig
import org.apache.hadoop.hbase.zookeeper.ZKConfig.ZKClusterKey
import org.apache.hadoop.hbase.zookeeper.ZKConfig
import org.apache.hadoop.hbase.zookeeper.ZKConfig.ZKClusterKey
import org.iot.spark.enity.{POITrafficData, TotalTrafficData, WindowTrafficData}
import org.iot.spark.processor.IotTrafficDataProcessor.prop
import org.iot.spark.util.PropertFileReader
import org.iot.spark.vo.{Constants, IotData}

import scala.collection.mutable


object HbaseTools {

  val prop:Properties = PropertFileReader.readPropertyFile()

  /* 获取hbase连接 */
  def getHbaseConn: Connection = {
    try{
      val prop:Properties = PropertFileReader.readPropertyFile();
      val config:Configuration = HBaseConfiguration.create()  //hbase配置
      config.set("hbase.zookeeper.quorum",prop.getProperty("hbase.zookeeper.quorum"))  //zookeaper地址
      config.set("hbase.zookeeper.property.clientPort" ,prop.getProperty("hbase.zookeeper.property.clientPort")) //zookeaper端口
      val connection = ConnectionFactory.createConnection(config)
      connection
    }catch{
      case exception: Exception =>
        error(exception.getMessage)
        error("HBase获取连接失败")
        null
    }
  }

  //从hbase中获取零偏值，并根据零偏值获取kafka数据
  def getOffsetFromHBase(connection: Connection,admin: Admin,topics: Array[String], group: String): collection.Map[TopicPartition, Long] = {
    if(!admin.tableExists(TableName.valueOf(Constants.HBASE_OFFSET_STORE_TABLE))){  /* 如果不存在保存零偏的表则创建 */
      val dat_offset = new HTableDescriptor(TableName.valueOf(Constants.HBASE_OFFSET_STORE_TABLE))
      dat_offset.addFamily(new HColumnDescriptor(Constants.HBASE_OFFSET_FAMILY_NAME))  /* 添加列族 */
      admin.createTable(dat_offset)
      admin.close();
    }
    val table = connection.getTable(TableName.valueOf(Constants.HBASE_OFFSET_STORE_TABLE))
    var myReturnValue:collection.Map[TopicPartition, Long] = new mutable.HashMap[TopicPartition,Long]()  //可变的hashmap
    for(eachTopic <- topics){
      val get = new Get((group+":"+eachTopic).getBytes())
      println(group+":"+eachTopic)
      val result: Result = table.get(get)
      val cells: Array[Cell] = result.rawCells()
      for(result <- cells){
        //列名  group:topic:partition
        val topicPartition: String = Bytes.toString( CellUtil.cloneQualifier(result))
        //列值 offset
        val offsetValue: String = Bytes.toString(CellUtil.cloneValue(result))
        //切割列名，获取 消费组，消费topic，消费partition
        val strings: Array[String] = topicPartition.split(":")
        val myStr = strings(2)
        //println(myStr)
        val partition =  new TopicPartition(strings(1),strings(2).toInt)
        myReturnValue += (partition -> offsetValue.toLong)
      }
    }
    table.close()
    myReturnValue
  }

  // 从hbase获取偏移后，根据偏移量获取kafka中的数据
  def getStreamingContextFromHBase(streamingContext: StreamingContext, kafkaParams: Map[String, Object], topics: Array[String], group: String,matchPattern:String): InputDStream[ConsumerRecord[String, IotData]] = {
    val connection: Connection = getHbaseConn
    val admin: Admin = connection.getAdmin
    var getOffset:collection.Map[TopicPartition, Long]  = HbaseTools.getOffsetFromHBase(connection,admin,topics,group)
    val result = if(getOffset.size > 0){
      val consumerStrategy: ConsumerStrategy[String, IotData] =  ConsumerStrategies.SubscribePattern[String,IotData](Pattern.compile(matchPattern),kafkaParams,getOffset)
      val value: InputDStream[ConsumerRecord[String, IotData]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,consumerStrategy)
      value
    }else{
      val consumerStrategy: ConsumerStrategy[String, IotData] =  ConsumerStrategies.SubscribePattern[String,IotData](Pattern.compile(matchPattern),kafkaParams)
      val value: InputDStream[ConsumerRecord[String, IotData]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,consumerStrategy)
      value
    }
    admin.close()
    connection.close()
    result
  }

  //保存kafka消费的零偏到hbase中
  def saveBatchOffset(group: String, topic: String, partition: String, offset: Long): Unit = {
    val conn: Connection = HbaseTools.getHbaseConn
    val table: Table = conn.getTable(TableName.valueOf(Constants.HBASE_OFFSET_STORE_TABLE))
    val rowkey = group + ":" + topic
    val columName = group + ":" + topic + ":" + partition
    val put = new Put(rowkey.getBytes())
    put.addColumn(Constants.HBASE_OFFSET_FAMILY_NAME.getBytes(),columName.getBytes(),offset.toString.getBytes())
    table.put(put)
    table.close()
    conn.close()
  }


  //以列名和值的格式插入数据
  def putMapData(conn:Connection, tableName: String, rowKey:String, mapData:Map[String , Any]): Unit = {
    val admin: Admin = conn.getAdmin
    if(!admin.tableExists(TableName.valueOf(tableName))){
      val tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))

      val familyDescriptor = new HColumnDescriptor(Constants.DEFAULT_DB_FAMILY)
      tableDescriptor.addFamily(familyDescriptor)

      admin.createTable(tableDescriptor)
    }

    def getRowKey(str:String,numRegion:Int):String ={
      val result: Int = (str.hashCode & Integer.MAX_VALUE) % numRegion
      val prefix:String = StringUtils.leftPad(result+"",4,"0");
      val suffix: String = DigestUtils.md5Hex(str).substring(0,12)
     // println("rowkey:" + prefix+suffix)
      prefix + suffix
    }

    val table: Table = conn.getTable(TableName.valueOf(tableName))
    try {
      val rowkey = Bytes.toBytes(getRowKey(rowKey, Constants.DEFAULT_REGION_NUM))
      val put: Put = new Put(rowkey)
      if (mapData.size > 0) {
        for ((k, v) <- mapData) {
          put.addColumn(Bytes.toBytes("MM"), Bytes.toBytes(k + ""), Bytes.toBytes(v + ""))
        }
      }
      table.put(put)
    }
  }

  //保存处理结果到hbase中
  def saveBusinessDatas(interpreter:String,parse:Any,conn:Connection) :Unit = {
    interpreter match {
      case tableName if (tableName.startsWith(prop.getProperty("traffic.total.data"))) =>
        val data = parse.asInstanceOf[TotalTrafficData]
        val tableName = prop.getProperty("traffic.total.data")
        if (null != data.routeId && null != data.vehicleType) {
          val row_key = data.vehicleType + "_" + data.routeId + "_" + data.recordDate
          val routeMapData:Map[String,String] = Map("routeId"->data.routeId)
          putMapData(conn ,tableName, row_key, routeMapData)
          val vehicleMapData:Map[String,String] = Map("vehicleType"->data.vehicleType)
          putMapData(conn ,tableName, row_key, vehicleMapData)
          val dateMapData:Map[String,String] = Map("recordDate"->data.recordDate)
          putMapData(conn ,tableName, row_key, dateMapData)
          val CountmapData:Map[String,String] = Map("totalCount"->data.totalCount.toString)
          putMapData(conn ,tableName, row_key, CountmapData)
        }
      case tableName if (tableName.startsWith(prop.getProperty("traffic.window.data"))) =>
        val data = parse.asInstanceOf[WindowTrafficData]
        val tableName = prop.getProperty("traffic.window.data")
        if (null != data.routeId && null != data.vehicleType) {
          val row_key = data.vehicleType + "_" + data.routeId + "_" + data.recordDate
          val routeMapData:Map[String,String] = Map("routeId"->data.routeId)
          putMapData(conn ,tableName, row_key, routeMapData)
          val vehicleMapData:Map[String,String] = Map("vehicleType"->data.vehicleType)
          putMapData(conn ,tableName, row_key, vehicleMapData)
          val dateMapData:Map[String,String] = Map("recordDate"->data.recordDate)
          putMapData(conn ,tableName, row_key, dateMapData)
          val CountmapData:Map[String,String] = Map("totalCount"->data.totalCount.toString)
          putMapData(conn ,tableName, row_key, CountmapData)
        }
      case tableName if (tableName.startsWith(prop.getProperty("traffic.poi.data"))) =>
        val data = parse.asInstanceOf[POITrafficData]
        val tableName = prop.getProperty("traffic.poi.data")
        if (null != data.vehicleId) {
          val row_key = data.vehicleId + "_" + new SimpleDateFormat("yyyy-MM-dd").format(new Date)
          val vehicleIDMapData:Map[String,String] = Map("vehicleId"->data.vehicleId)
          putMapData(conn ,tableName, row_key, vehicleIDMapData)
          val vehicleTypeMapData:Map[String,String] = Map("vehicleType"->data.vehicleType)
          putMapData(conn ,tableName, row_key, vehicleTypeMapData)
          val distanceMapData:Map[String,String] = Map("distance"->data.distance.toString)
          putMapData(conn ,tableName, row_key, distanceMapData)
        }
      case _=>
        println("######其他表##### : {}"  , interpreter)
    }
  }
}




